[New Feasture]: Support SP + DP parallal on Wan training#1223
[New Feasture]: Support SP + DP parallal on Wan training#1223mahaocong90 wants to merge 4 commits intomodelscope:mainfrom
Conversation
… DP + SP grouping configuration. The computational logic follows: DP = world_size / SP.
Summary of ChangesHello @mahaocong90, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the distributed training capabilities for WAN models by introducing support for combined Sequence Parallelism (SP) and Data Parallelism (DP). The changes enable users to specify a sequence parallelism size, which then configures the training pipeline, data loading, and loss calculation to operate efficiently across multiple GPUs, distributing both data batches and sequence elements. This aims to improve training scalability and performance, particularly for large models and long sequences. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for Sequence Parallelism (SP) combined with Data Parallelism (DP) for Wan training. The changes primarily involve adding and propagating the sp_size configuration to correctly set up the distributed environment and data loaders. The approach to ensure data consistency across SP ranks by synchronizing random noise and timesteps is sound. However, I've identified a critical issue in the build_dataloader function concerning worker initialization that will cause a runtime error. Additionally, there are several medium-severity issues, including code duplication and leftover debugging print statements, which should be addressed to enhance code quality and maintainability.
diffsynth/diffusion/runner.py
Outdated
| def worker_seed_init(seed): | ||
| random.seed(seed) | ||
| np.random.seed(seed) | ||
| torch.manual_seed(seed) | ||
|
|
||
| dataloader_kwargs = dict( | ||
| dataset=dataset, | ||
| sampler=sampler, | ||
| num_workers=num_workers, | ||
| pin_memory=True, | ||
| worker_init_fn=worker_seed_init, |
There was a problem hiding this comment.
There are a few critical issues with the worker_seed_init function and its usage that will lead to runtime errors and incorrect behavior:
- Missing Imports: The
randomandnp(numpy) modules are used but not imported in this file, which will cause aNameError. - Incorrect Signature: The
worker_init_fnfor a DataLoader receives theworker_idas an argument. The function signature should bedef worker_init_fn(worker_id):. - Unused Seed: The
seedparameter passed tobuild_dataloaderis not being used to seed the workers. Each worker should be seeded differently based on a base seed and its ID to ensure reproducibility.
Here's a suggested fix that addresses these points. Please also remember to add import random and import numpy as np at the top of the file.
| def worker_seed_init(seed): | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| dataloader_kwargs = dict( | |
| dataset=dataset, | |
| sampler=sampler, | |
| num_workers=num_workers, | |
| pin_memory=True, | |
| worker_init_fn=worker_seed_init, | |
| def worker_init_fn(worker_id): | |
| worker_seed = seed + worker_id | |
| random.seed(worker_seed) | |
| np.random.seed(worker_seed) | |
| torch.manual_seed(worker_seed) | |
| dataloader_kwargs = dict( | |
| dataset=dataset, | |
| sampler=sampler, | |
| num_workers=num_workers, | |
| pin_memory=True, | |
| worker_init_fn=worker_init_fn, |
There was a problem hiding this comment.
It seems unnecessary to specify worker_init_fn during dataloader initialization. It was only used to fix randomness when aligning loss precision, so I’ve removed it for now.
diffsynth/diffusion/loss.py
Outdated
| max_timestep_boundary = int(inputs.get("max_timestep_boundary", 1) * len(pipe.scheduler.timesteps)) | ||
| min_timestep_boundary = int(inputs.get("min_timestep_boundary", 0) * len(pipe.scheduler.timesteps)) | ||
|
|
diffsynth/diffusion/runner.py
Outdated
| print(f"accelerator.processid={rank}, accelerator.num_processes={world_size}, " | ||
| f"sp_size={sp_size}, dp_size={dp_size}, dp_rank={dp_rank}") | ||
| else: | ||
| if accelerator is not None: | ||
| dp_size = accelerator.num_processes | ||
| dp_rank = accelerator.process_index | ||
| else: | ||
| raise ValueError(f"Accelerator is None.") | ||
| print(f"dp_size={dp_size}, dp_rank={dp_rank}") |
@mahaocong90 Could you please specify how to solve this issue in detail, e.g., which files in xdit are to be modified? Thanks. |
Yes, My fix is like this: This fix works well on my PyTorch 2.8.0 environment. |
|
@mahaocong90 Thanks for your prompt response! Before fixing this issue, there are warnings as follows /root/anaconda3/envs/diffsynth/lib/python3.11/site-packages/torch/autograd/graph.py:865: UserWarning: c10d::allgather_base: an autograd kernel was not registered to the Autograd key(s) but we are trying to backprop through it. This may lead to silently incorrect behavior. This behavior is deprecated and will be removed in a future version of PyTorch. If your operator is differentiable, please ensure you have registered an autograd kernel to the correct Autograd key (e.g. DispatchKey::Autograd, DispatchKey::CompositeImplicitAutograd). If your operator is not differentiable, or to squash this warning and use the previous behavior, please register torch::CppFunction::makeFallthrough() to DispatchKey::Autograd. (Triggered internally at /pytorch/torch/csrc/autograd/autograd_not_implemented_fallback.cpp:76.) Following your solution, the above warning disappears during training. It works! PS: the same xfuser version, with pytorch version 2.10.0+cu128 |
Additionally, my PR still has some issues. Testing revealed that errors still occur when using DeepSpeed. This might be because, in order to ensure that all SP ranks within the SP group get the same sampler when using USP, I removed the dataloader wrapper applied by Accelerate’s prepare method. However, this causes DeepSpeed to raise an error with an empty message: It appears that manual configuration is necessary after Accelerate initialization, I am trying to fix this issue. |
|
Oh, I noticed the differences you mentioned.
|
Yes, I think it is necessary to remove Accelerate’s wrapping of the dataloader when using USP. After Accelerate wraps the dataloader, the sampler get iteration like follows (My accelerate version is 1.10.1): case1. DataLoaderConfiguration parameters is split_batches=False, dispatch_batches=False case2. DataLoaderConfiguration parameters is split_batches=False, dispatch_batches=True So it seems that the accelerate dataloader wrap is not suitable for sp parallelism. You can print the prompt associated with the training video to check whether the rank within the same sp group can get the same sample for each step. |
|
Thanks for your detailed explanation! In my case, I use the following code snippets: model, optimizer, scheduler = accelerator.prepare(model, optimizer, scheduler)
for epoch_id in range(num_epochs):
sampler.set_epoch(epoch_id)
for data in tqdm(dataloader):
with accelerator.accumulate(model):
optimizer.zero_grad()
if dataset.load_from_cache:
loss = model({}, inputs=data)
else:
loss = model(data)
if dp_rank == 0 and (sp_size <= 1 or sp_rank == 0):
with open('loss_dp0.txt', 'a') as f:
f.write(str(loss.item()) + '\n')
if dp_rank == 1 and (sp_size <= 1 or sp_rank == 0):
with open('loss_dp1.txt', 'a') as f:
f.write(str(loss.item()) + '\n')
accelerator.backward(loss)
optimizer.step()
model_logger.on_step_end(accelerator, model, save_steps, loss=loss)
scheduler.step()
if save_steps is None:
model_logger.on_epoch_end(accelerator, model, epoch_id)
model_logger.on_training_end(accelerator, model, save_steps)I still have two questions.
CUDA_VISIBLE_DEVICES=0,1 accelerate launch examples/wanvideo/model_training/train.py \
--dataset_base_path data/example_video_dataset \
--dataset_metadata_path data/example_video_dataset/metadata_s2v.csv \
--height 240 \
--width 416 \
--dataset_repeat 10 \
--model_id_with_origin_paths "Wan-AI/Wan2.1-FLF2V-14B-720P:diffusion_pytorch_model*.safetensors,Wan-AI/Wan2.1-FLF2V-14B-720P:models_t5_umt5-xxl-enc-bf16.pth,Wan-AI/Wan2.1-FLF2V-14B-720P:Wan2.1_VAE.pth,Wan-AI/Wan2.1-FLF2V-14B-720P:models_clip_open-clip-xlm-roberta-large-vit-huge-14.pth" \
--learning_rate 1e-4 \
--num_epochs 1 \
--remove_prefix_in_ckpt "pipe.dit." \
--output_path "./models/train/Wan2.1-FLF2V-14B-720P_lora" \
--lora_base_model "dit" \
--lora_target_modules "q,k,v,o,ffn.0,ffn.2" \
--lora_rank 32 \
--extra_inputs "input_image,end_image" \
--sp_size 1and accelerate launch examples/wanvideo/model_training/train.py \
--dataset_base_path data/example_video_dataset \
--dataset_metadata_path data/example_video_dataset/metadata_s2v.csv \
--height 240 \
--width 416 \
--dataset_repeat 10 \
--model_id_with_origin_paths "Wan-AI/Wan2.1-FLF2V-14B-720P:diffusion_pytorch_model*.safetensors,Wan-AI/Wan2.1-FLF2V-14B-720P:models_t5_umt5-xxl-enc-bf16.pth,Wan-AI/Wan2.1-FLF2V-14B-720P:Wan2.1_VAE.pth,Wan-AI/Wan2.1-FLF2V-14B-720P:models_clip_open-clip-xlm-roberta-large-vit-huge-14.pth" \
--learning_rate 1e-4 \
--num_epochs 1 \
--remove_prefix_in_ckpt "pipe.dit." \
--output_path "./models/train/Wan2.1-FLF2V-14B-720P_lora" \
--lora_base_model "dit" \
--lora_target_modules "q,k,v,o,ffn.0,ffn.2" \
--lora_rank 32 \
--extra_inputs "input_image,end_image" \
--sp_size 4I have already input a fixed seed to def get_pipeline_inputs(self, data):
inputs_posi = {"prompt": data["prompt"]}
inputs_nega = {}
inputs_shared = {
# Assume you are using this pipeline for inference,
# please fill in the input parameters.
"input_video": data["video"],
"height": data["video"][0].size[1],
"width": data["video"][0].size[0],
"num_frames": len(data["video"]),
# Please do not modify the following parameters
# unless you clearly know what this will cause.
"cfg_scale": 1,
"tiled": False,
"rand_device": self.pipe.device,
"use_gradient_checkpointing": self.use_gradient_checkpointing,
"use_gradient_checkpointing_offload": self.use_gradient_checkpointing_offload,
"cfg_merge": False,
"vace_scale": 1,
"max_timestep_boundary": self.max_timestep_boundary,
"min_timestep_boundary": self.min_timestep_boundary,
"seed": 42
}
inputs_shared = self.parse_extra_inputs(data, self.extra_inputs, inputs_shared)
return inputs_shared, inputs_posi, inputs_negaMay I know how you align the losses as your figure indicates? Thanks. |
Ok, Let me introduce my work: For the TI2V model, the fwd calculation involves the following steps: get data from the dataset, text embedding, vae encoder, dit, calculate the predicted noise and loss. step1. Ensure that the input is consistent, that is, within the same sp group, each sp rank should get the same sample each time. step2. Diffsynth generates random noise and timestep_id before caculate dit at each step, and thereby generates noisy input and modulation input. Therefore, it is also necessary to ensure that the noise and timestep of each sp rank are the same. I generate random values on sp rank0 and then broadcast them to other sp ranks: I think the same random seed generator can be created for each sp rank and ensure that the random seed is the same for each step, so that the same random input can also be obtained. step3. Next, it is necessary to ensure that other random numbers in the calculation are fixed. For instance, if your algorithm needs to add other random frames to the input or add other random noises, these random seeds need to be fixed. step4. After the input is fully aligned, you can test whether the results of the loss calculation by fwd are equal. If they are not completely consistent, then you have to print intermediate result tensor and compared step by step. This is a troublesome task. You can try to reduce the number of model layers to simplify the problem (e.g. Change the 24 blocks of the dit of wan2.2ti2v 5B to 1). step5. When the loss of fwd can be aligned, add the calculation of bwd. torch's autograd automatically ensures the gradient calculation and update process. The problem I encountered was caused by the operator not providing a bwd, which led to a computational graph broken. Then, check if there is an activation value gradient of 0 / NAN / inf: Then looking for the location where the exception occurred If there are outliers (tensor.register_hook(grad_hook) is called when bwd calculates grad). The order of gradient calculation is opposite to fwd. If 0 / NAN/inf appears from a certain step, then this operator may not provide bwd or the calculated grad is incorrect. This is my work and welcome to discuss and add more. |
… and DeepSpeed zero2, where train_micro_batch_size_per_gpu must be specified in the json.
This PR adds SP + DP support for WAN training through the USP interface.
Description
Added new configuration parameters
--sp_size. USP will be enabled when--sp_size> 1. This is an example of enabling sequence parallelism, running on one node with 8 GPUs, SP = 4, DP = 2.Environment version
Loss Accuracy
I used wan2.2 ti2v-5B model for debugging. Functional testing was performed using the Databoost/VidData dataset from Hugging Face, which contains 1,006 MP4 video files and a CSV description file, conforming to the diffsynth dataset format.
To ensure that sequence parallelism (sp) does not affect the loss outcome, I aligned the loss between dp=2 + sp=4 and dp=2, to make sure the number of gradient accumulation steps for weight updates was identical. Then, I fixed the random seeds for sampler, noise, and timestep generation to guarantee identical inputs.
The launch parameters for the dp=2 configuration :
Forward process loss comparison (Steps 0-100):


Backward process losscomparison (Epoch=2, global steps=1006):
During debugging of the backward pass, I ran into two issues related to xdit. The latest release of the base library seemed not to have merged the PRs addressing these fixes. Hence, I manually patched xdit in my debugging environment to guarantee correct backward functionality.
Issue for xDit
Issue1 is related to PR xdit-project/xDiT#598
"xFuserRingFlashAttnFunc has 17 inputs (including ctx), but it inherits the backward() method from RingFlashAttnFunc which only returns 16 values (3 gradients + 13 Nones)!"
Issue2: The all_gather interface in xdit utilizes torch’s all_gather_into_tensor interface. As for the torch 2.9 release version, this interface still does not provide a backward method and cannot support automatic autograd. The commit in the torch community (pytorch/pytorch#168140) has not yet been merged. Therefore, a simple replacement of all_gather_into_tensor in xdit with torch.distributed.nn.functional.all_gather is applied here to enable autograd support.
Additionally, I have only tested Wan 2.2, Ti2V-5B and a small dataset on my development environment. The results appear to align with expectations. I will conduct further tests using other Wan models to confirm that there is no impact on other core functionalities.
Thanks